Connectivity Software User's Guide and Reference
Subscribing to Data in Sparkplug Consumer
Rapid Toolkit for Sparkplug > Concepts > Developing Sparkplug Host Applications > Subscribing to Data in Sparkplug Consumer
In This Topic

Introduction

Subscribing to data is the most common functionality of Sparkplug host applications (consumers). Rapid Toolkit for Sparkplug allows you to make data subscriptions extremely easily, with a single method call, without even having to explicitly make connection to the Sparkplug broker, or other unnecessary boilerplate code seen in other toolkits.

For efficiency, Sparkplug allows the producers to "bundle" multiple metrics in a single message payload. Rapid Toolkit for Sparkplug allows you to work with bundled metrics, or it can unbundle them for you. This is useful in applications that treat every metrics as a data stream completely independent of others.

The data you subscribe to are delivered to your code using data notifications, in the form of events or callbacks (you can choose). See Data Notifications in Sparkplug Consumer for discussion on the data notifications, their meaning and contents. If you are using events handlers to process the data notifications, you need to add them before making the subscription, otherwise the initial data notifications might be lost.

Rapid Toolkit for Sparkplug also uses the data notifications to provide information related to the status of the MQTT connection.

Filters in Subscriptions

When subscribing to data, you need to identify what you are subscribing to. This requires you to specify the Sparkplug groups, edge nodes, devices (optional), and metrics. In Rapid Toolkit for Sparkplug, any of these items can be specified exactly, by providing its ID or name, or with a use of filter. The filter allows you to subscribe to a wider range of data, with a single call. For example, you may easily subscribe to all metrics in the given edge node, or *any* edge node in the Sparkplug system.

Rapid Toolkit for Sparkplug uses a modified concept of MQTT topic filters (with optional wildcards) for its filters.

For Sparkplug groups IDs, edge node IDs, and device IDs, you can use a single "#" or "+" instead of a concrete ID to denote "any ID". For example, "#" in place of an edge node ID means "any edge node".

For Sparkplug metric names, you can use the full MQTT topic filter syntax to specify the metrics you are interested in. For example, "Folder1/+/MyMetric" prescribes a metric name starting with segment "Folder1" and ending with segment "MyMetric", with a single segment in the middle which can be anything. Metric name filter "+" denotes metric name with a single segment; metric name filter "#" denotes any metric name whatsoever. Note: Do not be confused by the fact that in Sparkplug, metric names do not participate in forming the MQTT topic names, and therefore MQTT filtering cannot be used for metrics. This is a functionality added by Rapid Toolkit for Sparkplug which simply reuses the familiar syntax for somewhat different purpose.

Subscribing to Metrics

In order to subscribe to data notifications for individual Sparkplug metrics, call one of the following methods:

The subscriptions methods return an integer handle which can later be used to unsubscribe from the metric.

The following example illustrates subscribing to all metrics directly on a given edge node.

.NET

// This example shows how to subscribe to all metrics of a given edge node and display the name and value of the metric with
// each notification.
//
// In order to publish or observe messages for this example, start the SparkplugEdgeNodeConsoleDemo program first.
//
// Find all latest examples here: https://opclabs.doc-that.com/files/onlinedocs/OPCLabs-ConnectivityStudio/Latest/examples.html .
// Sparkplug examples in C# on GitHub: https://github.com/OPCLabs/Examples-ConnectivityStudio-CSharp .
// Missing some example? Ask us for it on our Online Forums, https://www.opclabs.com/forum/index ! You do not have to own
// a commercial license in order to use Online Forums, and we reply to every post.

using System;
using OpcLabs.EasySparkplug;
using OpcLabs.EasySparkplug.OperationModel;

namespace SparkplugDocExamples.Consumer._EasySparkplugConsumer
{
    partial class SubscribeEdgeNodeMetric
    {
        public static void Overload1()
        {
            // Note that the default port for the "mqtt" scheme is 1883.
            var hostDescriptor = new SparkplugHostDescriptor("mqtt://localhost");

            // Instantiate the consumer object and hook events.
            var consumer = new EasySparkplugConsumer();
            consumer.MetricNotification += consumer_Overload1_MetricNotification;

            Console.WriteLine("Subscribing...");
            // In this example, we specify the precise Sparkplug group ID and edge node ID, but allow any metric name.
            consumer.SubscribeEdgeNodeMetric(hostDescriptor, 
                "easyGroup", "easySparkplugDemo", "#");

            Console.WriteLine("Processing notifications for 20 seconds...");
            System.Threading.Thread.Sleep(20 * 1000);

            Console.WriteLine("Unsubscribing...");
            consumer.UnsubscribeAllMetrics();

            Console.WriteLine("Waiting for 5 seconds...");
            System.Threading.Thread.Sleep(5 * 1000);

            Console.WriteLine("Finished.");
        }


        static void consumer_Overload1_MetricNotification(object sender, EasySparkplugMetricNotificationEventArgs eventArgs)
        {
            // Handle different types of notifications.
            Console.WriteLine();
            switch (eventArgs.NotificationType)
            {
                case SparkplugNotificationType.Connect:
                    Console.WriteLine($"Connected to Sparkplug host, client ID: {eventArgs.ClientId}.");
                    break;
                case SparkplugNotificationType.Disconnect:
                    Console.WriteLine("Disconnected from Sparkplug host.");
                    break;
                case SparkplugNotificationType.Data:
                    Console.WriteLine("Received data from Sparkplug host.");
                    Console.WriteLine($"Metric name: {eventArgs.MetricName}");
                    Console.WriteLine($"Value: {eventArgs.MetricData?.Value}");
                    break;
                case SparkplugNotificationType.Birth:
                    Console.WriteLine("Received birth message from Sparkplug host.");
                    Console.WriteLine($"Metric name: {eventArgs.MetricName}");
                    Console.WriteLine($"Value: {eventArgs.MetricData?.Value}");
                    break;
                case SparkplugNotificationType.Death:
                    Console.WriteLine("Received death message from Sparkplug host.");
                    Console.WriteLine($"Metric name: {eventArgs.MetricName}");
                    break;
            }
            if (!eventArgs.Succeeded)
                Console.WriteLine($"*** Failure: {eventArgs.ErrorMessageBrief}");
        }
    }
}
' This example shows how to subscribe to all metrics of a given edge node and display the name and value of the metric with
' each notification.
'
' In order to publish or observe messages for this example, start the SparkplugEdgeNodeConsoleDemo program first.
'
' Find all latest examples here: https://opclabs.doc-that.com/files/onlinedocs/OPCLabs-ConnectivityStudio/Latest/examples.html .
' Sparkplug examples in C# on GitHub: https://github.com/OPCLabs/Examples-ConnectivityStudio-CSharp .
' Missing some example? Ask us for it on our Online Forums, https://www.opclabs.com/forum/index ! You do not have to own
' a commercial license in order to use Online Forums, and we reply to every post.

Imports OpcLabs.EasySparkplug
Imports OpcLabs.EasySparkplug.OperationModel

Namespace Global.SparkplugDocExamples.Consumer._EasySparkplugConsumer
    Partial Class SubscribeEdgeNodeMetric
        Public Shared Sub Overload1()
            ' Note that the default port for the "mqtt" scheme is 1883.
            Dim hostDescriptor = New SparkplugHostDescriptor("mqtt://localhost")

            ' Instantiate the consumer object and hook events.
            Dim consumer = New EasySparkplugConsumer()
            AddHandler consumer.MetricNotification, AddressOf consumer_Overload1_MetricNotification

            Console.WriteLine("Subscribing...")
            ' In this example, we specify the precise Sparkplug group ID and edge node ID, but allow any metric name.
            consumer.SubscribeEdgeNodeMetric(hostDescriptor,
                "easyGroup", "easySparkplugDemo", "#")

            Console.WriteLine("Processing notifications for 20 seconds...")
            Threading.Thread.Sleep(20 * 1000)

            Console.WriteLine("Unsubscribing...")
            consumer.UnsubscribeAllMetrics()

            Console.WriteLine("Waiting for 5 seconds...")
            Threading.Thread.Sleep(5 * 1000)

            Console.WriteLine("Finished.")
        End Sub

        Private Shared Sub consumer_Overload1_MetricNotification(ByVal sender As Object, ByVal eventArgs As EasySparkplugMetricNotificationEventArgs)
            ' Handle different types of notifications.
            Console.WriteLine()
            Select Case eventArgs.NotificationType
                Case SparkplugNotificationType.Connect
                    Console.WriteLine($"Connected to Sparkplug host, client ID: {eventArgs.ClientId}.")
                Case SparkplugNotificationType.Disconnect
                    Console.WriteLine("Disconnected from Sparkplug host.")
                Case SparkplugNotificationType.Data
                    Console.WriteLine("Received data from Sparkplug host.")
                    Console.WriteLine($"Metric name: {eventArgs.MetricName}")
                    Console.WriteLine($"Value: {eventArgs.MetricData?.Value}")
                Case SparkplugNotificationType.Birth
                    Console.WriteLine("Received birth message from Sparkplug host.")
                    Console.WriteLine($"Metric name: {eventArgs.MetricName}")
                    Console.WriteLine($"Value: {eventArgs.MetricData?.Value}")
                Case SparkplugNotificationType.Death
                    Console.WriteLine("Received death message from Sparkplug host.")
                    Console.WriteLine($"Metric name: {eventArgs.MetricName}")
            End Select
            If Not eventArgs.Succeeded Then
                Console.WriteLine($"*** Failure: {eventArgs.ErrorMessageBrief}")
            End If
        End Sub
    End Class
End Namespace

 

The example below illustrates subscribing to all metrics on a give device.

.NET

// This example shows how to subscribe to all metrics of a given device and display the name and value of the metric with
// each notification.
//
// In order to publish or observe messages for this example, start the SparkplugEdgeNodeConsoleDemo program first.
//
// Find all latest examples here: https://opclabs.doc-that.com/files/onlinedocs/OPCLabs-ConnectivityStudio/Latest/examples.html .
// Sparkplug examples in C# on GitHub: https://github.com/OPCLabs/Examples-ConnectivityStudio-CSharp .
// Missing some example? Ask us for it on our Online Forums, https://www.opclabs.com/forum/index ! You do not have to own
// a commercial license in order to use Online Forums, and we reply to every post.

using System;
using OpcLabs.EasySparkplug;
using OpcLabs.EasySparkplug.OperationModel;

namespace SparkplugDocExamples.Consumer._EasySparkplugConsumer
{
    partial class SubscribeDeviceMetric
    {
        public static void Overload1()
        {
            // Note that the default port for the "mqtt" scheme is 1883.
            var hostDescriptor = new SparkplugHostDescriptor("mqtt://localhost");

            // Instantiate the consumer object and hook events.
            var consumer = new EasySparkplugConsumer();
            consumer.MetricNotification += consumer_Overload1_MetricNotification;

            Console.WriteLine("Subscribing...");
            // In this example, we specify the precise Sparkplug group ID and edge node ID, but allow any metric name.
            consumer.SubscribeDeviceMetric(hostDescriptor, 
                "easyGroup", "easySparkplugDemo", "data", "#");

            Console.WriteLine("Processing notifications for 20 seconds...");
            System.Threading.Thread.Sleep(20 * 1000);

            Console.WriteLine("Unsubscribing...");
            consumer.UnsubscribeAllMetrics();

            Console.WriteLine("Waiting for 5 seconds...");
            System.Threading.Thread.Sleep(5 * 1000);

            Console.WriteLine("Finished.");
        }


        static void consumer_Overload1_MetricNotification(object sender, EasySparkplugMetricNotificationEventArgs eventArgs)
        {
            // Handle different types of notifications.
            Console.WriteLine();
            switch (eventArgs.NotificationType)
            {
                case SparkplugNotificationType.Connect:
                    Console.WriteLine($"Connected to Sparkplug host, client ID: {eventArgs.ClientId}.");
                    break;
                case SparkplugNotificationType.Disconnect:
                    Console.WriteLine("Disconnected from Sparkplug host.");
                    break;
                case SparkplugNotificationType.Data:
                    Console.WriteLine("Received data from Sparkplug host.");
                    Console.WriteLine($"Metric name: {eventArgs.MetricName}");
                    Console.WriteLine($"Value: {eventArgs.MetricData?.Value}");
                    break;
                case SparkplugNotificationType.Birth:
                    Console.WriteLine("Received birth message from Sparkplug host.");
                    Console.WriteLine($"Metric name: {eventArgs.MetricName}");
                    Console.WriteLine($"Value: {eventArgs.MetricData?.Value}");
                    break;
                case SparkplugNotificationType.Death:
                    Console.WriteLine("Received death message from Sparkplug host.");
                    Console.WriteLine($"Metric name: {eventArgs.MetricName}");
                    break;
            }
            if (!eventArgs.Succeeded)
                Console.WriteLine($"*** Failure: {eventArgs.ErrorMessageBrief}");
        }
    }
}
' This example shows how to subscribe to all metrics of a given device and display the name and value of the metric with
' each notification.
'
' In order to publish or observe messages for this example, start the SparkplugEdgeNodeConsoleDemo program first.
'
' Find all latest examples here: https://opclabs.doc-that.com/files/onlinedocs/OPCLabs-ConnectivityStudio/Latest/examples.html .
' Sparkplug examples in C# on GitHub: https://github.com/OPCLabs/Examples-ConnectivityStudio-CSharp .
' Missing some example? Ask us for it on our Online Forums, https://www.opclabs.com/forum/index ! You do not have to own
' a commercial license in order to use Online Forums, and we reply to every post.

Imports OpcLabs.EasySparkplug
Imports OpcLabs.EasySparkplug.OperationModel

Namespace Global.SparkplugDocExamples.Consumer._EasySparkplugConsumer
    Partial Class SubscribeDeviceMetric
        Public Shared Sub Overload1()
            ' Note that the default port for the "mqtt" scheme is 1883.
            Dim hostDescriptor = New SparkplugHostDescriptor("mqtt://localhost")

            ' Instantiate the consumer object and hook events.
            Dim consumer = New EasySparkplugConsumer()
            AddHandler consumer.MetricNotification, AddressOf consumer_Overload1_MetricNotification

            Console.WriteLine("Subscribing...")
            ' In this example, we specify the precise Sparkplug group ID and edge node ID, but allow any metric name.
            consumer.SubscribeDeviceMetric(hostDescriptor,
                "easyGroup", "easySparkplugDemo", "data", "#")

            Console.WriteLine("Processing notifications for 20 seconds...")
            Threading.Thread.Sleep(20 * 1000)

            Console.WriteLine("Unsubscribing...")
            consumer.UnsubscribeAllMetrics()

            Console.WriteLine("Waiting for 5 seconds...")
            Threading.Thread.Sleep(5 * 1000)

            Console.WriteLine("Finished.")
        End Sub

        Private Shared Sub consumer_Overload1_MetricNotification(ByVal sender As Object, ByVal eventArgs As EasySparkplugMetricNotificationEventArgs)
            ' Handle different types of notifications.
            Console.WriteLine()
            Select Case eventArgs.NotificationType
                Case SparkplugNotificationType.Connect
                    Console.WriteLine($"Connected to Sparkplug host, client ID: {eventArgs.ClientId}.")
                Case SparkplugNotificationType.Disconnect
                    Console.WriteLine("Disconnected from Sparkplug host.")
                Case SparkplugNotificationType.Data
                    Console.WriteLine("Received data from Sparkplug host.")
                    Console.WriteLine($"Metric name: {eventArgs.MetricName}")
                    Console.WriteLine($"Value: {eventArgs.MetricData?.Value}")
                Case SparkplugNotificationType.Birth
                    Console.WriteLine("Received birth message from Sparkplug host.")
                    Console.WriteLine($"Metric name: {eventArgs.MetricName}")
                    Console.WriteLine($"Value: {eventArgs.MetricData?.Value}")
                Case SparkplugNotificationType.Death
                    Console.WriteLine("Received death message from Sparkplug host.")
                    Console.WriteLine($"Metric name: {eventArgs.MetricName}")
            End Select
            If Not eventArgs.Succeeded Then
                Console.WriteLine($"*** Failure: {eventArgs.ErrorMessageBrief}")
            End If
        End Sub
    End Class
End Namespace

 

Subscribing to Payloads

In order to subscribe to data notifications for incoming Sparkplug payloads, call one of the following methods:

Normally, "#" is used as a metric name filter, meaning that all metrics will be included in the data notification. There is nothing that prevents you, however, from specifying a narrower filter, if you have such a need.

The subscriptions methods return an integer handle which can later be used to unsubscribe from the payload.

By default, Rapid Toolkit for Sparkplug puts all metrics of the edge node or device in the notification, including those that have not been explicitly sent in the received data message. This behavior can be changed, see Complete and Incomplete Datasets further below in this article.

The following example illustrates subscribing to all metrics directly on a given edge node.

.NET

// This example shows how to subscribe to the payload with all metrics of a given edge node and display the received payload
// with each notification.
//
// In order to publish or observe messages for this example, start the SparkplugEdgeNodeConsoleDemo program first.
//
// Find all latest examples here: https://opclabs.doc-that.com/files/onlinedocs/OPCLabs-ConnectivityStudio/Latest/examples.html .
// Sparkplug examples in C# on GitHub: https://github.com/OPCLabs/Examples-ConnectivityStudio-CSharp .
// Missing some example? Ask us for it on our Online Forums, https://www.opclabs.com/forum/index ! You do not have to own
// a commercial license in order to use Online Forums, and we reply to every post.

using System;
using System.Collections.Generic;
using OpcLabs.EasySparkplug;
using OpcLabs.EasySparkplug.OperationModel;

namespace SparkplugDocExamples.Consumer._EasySparkplugConsumer
{
    class SubscribeEdgeNodePayload
    {
        public static void Overload1()
        {
            // Note that the default port for the "mqtt" scheme is 1883.
            var hostDescriptor = new SparkplugHostDescriptor("mqtt://localhost");

            // Instantiate the consumer object and hook events.
            var consumer = new EasySparkplugConsumer();
            consumer.PayloadNotification += consumer_Overload1_PayloadNotification;

            Console.WriteLine("Subscribing...");
            consumer.SubscribeEdgeNodePayload(hostDescriptor, "easyGroup", "easySparkplugDemo");

            Console.WriteLine("Processing notifications for 20 seconds...");
            System.Threading.Thread.Sleep(20 * 1000);

            Console.WriteLine("Unsubscribing...");
            consumer.UnsubscribeAllPayloads();

            Console.WriteLine("Waiting for 5 seconds...");
            System.Threading.Thread.Sleep(5 * 1000);

            Console.WriteLine("Finished.");
        }


        static void consumer_Overload1_PayloadNotification(object sender, EasySparkplugPayloadNotificationEventArgs eventArgs)
        {
            // Handle different types of notifications.
            Console.WriteLine();
            switch (eventArgs.NotificationType)
            {
                case SparkplugNotificationType.Connect:
                    Console.WriteLine($"Connected to Sparkplug host, client ID: {eventArgs.ClientId}.");
                    break;
                case SparkplugNotificationType.Disconnect:
                    Console.WriteLine("Disconnected from Sparkplug host.");
                    break;
                case SparkplugNotificationType.Data:
                case SparkplugNotificationType.Birth:
                    Console.WriteLine("Received birth or data message from Sparkplug host.");
                    // Display the metrics name and data for each metric delivered in the payload.
                    foreach (KeyValuePair<string, SparkplugMetricElement> pair in eventArgs.Payload)
                        Console.WriteLine($"{pair.Key}: {pair.Value.MetricData}");
                    break;
                case SparkplugNotificationType.Death:
                    Console.WriteLine("Received death message from Sparkplug host.");
                    break;
            }
            if (!eventArgs.Succeeded)
                Console.WriteLine($"*** Failure: {eventArgs.ErrorMessageBrief}");
        }
    }
}
' This example shows how to subscribe to the payload with all metrics of a given edge node and display the received payload
' with each notification.
'
' In order to publish or observe messages for this example, start the SparkplugEdgeNodeConsoleDemo program first.
'
' Find all latest examples here: https://opclabs.doc-that.com/files/onlinedocs/OPCLabs-ConnectivityStudio/Latest/examples.html .
' Sparkplug examples in C# on GitHub: https://github.com/OPCLabs/Examples-ConnectivityStudio-CSharp .
' Missing some example? Ask us for it on our Online Forums, https://www.opclabs.com/forum/index ! You do not have to own
' a commercial license in order to use Online Forums, and we reply to every post.

Imports OpcLabs.EasySparkplug
Imports OpcLabs.EasySparkplug.OperationModel

Namespace Global.SparkplugDocExamples.Consumer._EasySparkplugConsumer
    Class SubscribeEdgeNodePayload
        Public Shared Sub Overload1()
            ' Note that the default port for the "mqtt" scheme is 1883.
            Dim hostDescriptor = New SparkplugHostDescriptor("mqtt://localhost")

            ' Instantiate the consumer object and hook events.
            Dim consumer = New EasySparkplugConsumer()
            AddHandler consumer.PayloadNotification, AddressOf consumer_Overload1_PayloadNotification

            Console.WriteLine("Subscribing...")
            consumer.SubscribeEdgeNodePayload(hostDescriptor, "easyGroup", "easySparkplugDemo")

            Console.WriteLine("Processing notifications for 20 seconds...")
            Threading.Thread.Sleep(20 * 1000)

            Console.WriteLine("Unsubscribing...")
            consumer.UnsubscribeAllPayloads()

            Console.WriteLine("Waiting for 5 seconds...")
            Threading.Thread.Sleep(5 * 1000)

            Console.WriteLine("Finished.")
        End Sub

        Private Shared Sub consumer_Overload1_PayloadNotification(ByVal sender As Object, ByVal eventArgs As EasySparkplugPayloadNotificationEventArgs)
            ' Handle different types of notifications.
            Console.WriteLine()
            Select Case eventArgs.NotificationType
                Case SparkplugNotificationType.Connect
                    Console.WriteLine($"Connected to Sparkplug host, client ID: {eventArgs.ClientId}.")
                Case SparkplugNotificationType.Disconnect
                    Console.WriteLine("Disconnected from Sparkplug host.")
                Case SparkplugNotificationType.Data,
                    SparkplugNotificationType.Birth
                    Console.WriteLine("Received birth or data message from Sparkplug host.")
                    ' Display the metrics name and data for each metric delivered in the payload.
                    For Each pair As KeyValuePair(Of String, SparkplugMetricElement) In eventArgs.Payload
                        Console.WriteLine($"{pair.Key}: {pair.Value.MetricData}")
                    Next
                Case SparkplugNotificationType.Death
                    Console.WriteLine("Received death message from Sparkplug host.")
            End Select
            If Not eventArgs.Succeeded Then
                Console.WriteLine($"*** Failure: {eventArgs.ErrorMessageBrief}")
            End If
        End Sub
    End Class
End Namespace

 

Unsubscribing

When you subscribe to metrics or payloads in Rapid Toolkit for Sparkplug, the subscriptions stays alive until you explicitly unsubscribe, or until the consumer object (instance of the EasySparkplugConsumer Class) is disposed of. The subscriptions also outlive possible disconnections from the MQTT broker caused e.g. by network disruptions or computer failures.

In order to unsubscribe from a metric, call the UnsubscribeMetric Method and pass it the metric subscription handle returned to you when you called a metric subscription method. You can also call the UnsubscribeAllMetrics Method to unsubscribe from all metrics at once. The metric subscription handle is no longer valid after you make the method call.

In order to unsubscribe from a payload, call the UnsubscribePayload Method and pass it the payload subscription handle returned to you when you called a payload subscription method. You can also call the UnsubscribeAllPayloads Method to unsubscribe from all payloads at once. The payload subscription handle is no longer valid after you make the method call.

In order to unsubscribe "from everything" (all metrics and payloads), use the UnsubscribeAll Method (an extension method).

The following example illustrates how to unsubscribe froma previously subscribed metric.

.NET

// This example shows how unsubscribe from a specified metric subscription (which can be for a single metric, or for all
// metrics passing the specified name filter).
//
// In order to publish or observe messages for this example, start the SparkplugEdgeNodeConsoleDemo program first.
//
// Find all latest examples here: https://opclabs.doc-that.com/files/onlinedocs/OPCLabs-ConnectivityStudio/Latest/examples.html .
// Sparkplug examples in C# on GitHub: https://github.com/OPCLabs/Examples-ConnectivityStudio-CSharp .
// Missing some example? Ask us for it on our Online Forums, https://www.opclabs.com/forum/index ! You do not have to own
// a commercial license in order to use Online Forums, and we reply to every post.

using System;
using OpcLabs.EasySparkplug;
using OpcLabs.EasySparkplug.OperationModel;

namespace SparkplugDocExamples.Consumer._EasySparkplugConsumer
{
    class UnsubscribeMetric
    {
        public static void Main1()
        {
            // Note that the default port for the "mqtt" scheme is 1883.
            var hostDescriptor = new SparkplugHostDescriptor("mqtt://localhost");

            // Instantiate the consumer object and hook events.
            var consumer = new EasySparkplugConsumer();
            consumer.MetricNotification += consumer_Main1_MetricNotification;

            Console.WriteLine("Subscribing...");
            // In this example, we specify the precise Sparkplug group ID and edge node ID, but allow any metric name.
            int handle = consumer.SubscribeEdgeNodeMetric(hostDescriptor, 
                "easyGroup", "easySparkplugDemo", "#");

            Console.WriteLine("Processing notifications for 20 seconds...");
            System.Threading.Thread.Sleep(20 * 1000);

            Console.WriteLine("Unsubscribing...");
            consumer.UnsubscribeMetric(handle);

            Console.WriteLine("Waiting for 5 seconds...");
            System.Threading.Thread.Sleep(5 * 1000);

            Console.WriteLine("Finished.");
        }


        static void consumer_Main1_MetricNotification(object sender, EasySparkplugMetricNotificationEventArgs eventArgs)
        {
            // Handle different types of notifications.
            Console.WriteLine();
            switch (eventArgs.NotificationType)
            {
                case SparkplugNotificationType.Connect:
                    Console.WriteLine($"Connected to Sparkplug host, client ID: {eventArgs.ClientId}.");
                    break;
                case SparkplugNotificationType.Disconnect:
                    Console.WriteLine("Disconnected from Sparkplug host.");
                    break;
                case SparkplugNotificationType.Data:
                    Console.WriteLine("Received data from Sparkplug host.");
                    Console.WriteLine($"Metric name: {eventArgs.MetricName}");
                    Console.WriteLine($"Value: {eventArgs.MetricData?.Value}");
                    break;
                case SparkplugNotificationType.Birth:
                    Console.WriteLine("Received birth message from Sparkplug host.");
                    Console.WriteLine($"Metric name: {eventArgs.MetricName}");
                    Console.WriteLine($"Value: {eventArgs.MetricData?.Value}");
                    break;
                case SparkplugNotificationType.Death:
                    Console.WriteLine("Received death message from Sparkplug host.");
                    Console.WriteLine($"Metric name: {eventArgs.MetricName}");
                    break;
            }
            if (!eventArgs.Succeeded)
                Console.WriteLine($"*** Failure: {eventArgs.ErrorMessageBrief}");
        }
    }
}
' This example shows how unsubscribe from a specified metric subscription (which can be for a single metric, or for all
' metrics passing the specified name filter).
'
' In order to publish or observe messages for this example, start the SparkplugEdgeNodeConsoleDemo program first.
'
' Find all latest examples here: https://opclabs.doc-that.com/files/onlinedocs/OPCLabs-ConnectivityStudio/Latest/examples.html .
' Sparkplug examples in C# on GitHub: https://github.com/OPCLabs/Examples-ConnectivityStudio-CSharp .
' Missing some example? Ask us for it on our Online Forums, https://www.opclabs.com/forum/index ! You do not have to own
' a commercial license in order to use Online Forums, and we reply to every post.

Imports OpcLabs.EasySparkplug
Imports OpcLabs.EasySparkplug.OperationModel

Namespace Global.SparkplugDocExamples.Consumer._EasySparkplugConsumer
    Class UnsubscribeMetric
        Public Shared Sub Main1()
            ' Note that the default port for the "mqtt" scheme is 1883.
            Dim hostDescriptor = New SparkplugHostDescriptor("mqtt://localhost")

            ' Instantiate the consumer object and hook events.
            Dim consumer = New EasySparkplugConsumer()
            AddHandler consumer.MetricNotification, AddressOf consumer_Main1_MetricNotification

            Console.WriteLine("Subscribing...")
            ' In this example, we specify the precise Sparkplug group ID and edge node ID, but allow any metric name.
            Dim handle As Integer = consumer.SubscribeEdgeNodeMetric(hostDescriptor,
                "easyGroup", "easySparkplugDemo", "#")

            Console.WriteLine("Processing notifications for 20 seconds...")
            Threading.Thread.Sleep(20 * 1000)

            Console.WriteLine("Unsubscribing...")
            consumer.UnsubscribeMetric(handle)

            Console.WriteLine("Waiting for 5 seconds...")
            Threading.Thread.Sleep(5 * 1000)

            Console.WriteLine("Finished.")
        End Sub

        Private Shared Sub consumer_Main1_MetricNotification(ByVal sender As Object, ByVal eventArgs As EasySparkplugMetricNotificationEventArgs)
            ' Handle different types of notifications.
            Console.WriteLine()
            Select Case eventArgs.NotificationType
                Case SparkplugNotificationType.Connect
                    Console.WriteLine($"Connected to Sparkplug host, client ID: {eventArgs.ClientId}.")
                Case SparkplugNotificationType.Disconnect
                    Console.WriteLine("Disconnected from Sparkplug host.")
                Case SparkplugNotificationType.Data
                    Console.WriteLine("Received data from Sparkplug host.")
                    Console.WriteLine($"Metric name: {eventArgs.MetricName}")
                    Console.WriteLine($"Value: {eventArgs.MetricData?.Value}")
                Case SparkplugNotificationType.Birth
                    Console.WriteLine("Received birth message from Sparkplug host.")
                    Console.WriteLine($"Metric name: {eventArgs.MetricName}")
                    Console.WriteLine($"Value: {eventArgs.MetricData?.Value}")
                Case SparkplugNotificationType.Death
                    Console.WriteLine("Received death message from Sparkplug host.")
                    Console.WriteLine($"Metric name: {eventArgs.MetricName}")
            End Select
            If Not eventArgs.Succeeded Then
                Console.WriteLine($"*** Failure: {eventArgs.ErrorMessageBrief}")
            End If
        End Sub
    End Class
End Namespace

 

Complete and Incomplete Datasets

Sparkplug edge nodes and devices are required to specify all their metrics when they publish their Birth messages. The subsequent Data messages from the Sparkplug edge nodes and devices can contain only the metrics that have changed (we call it the incomplete dataset). If the consumer wants a complete picture of all the metrics in the edge node or device (we call it the complete dataset), it needs to put the together the last know data with the data from the newest update. Rapid Toolkit for Sparkplug does this for you. That is, when you subscribe to an edge node or device payload, the notifications that you receive always contain all the metrics. The metrics that were just received are there of course; any metrics that were not contained in the received Data message but were specified in the Birth message are filled in with last-known data.

This functionality make it easy to process the datasets in most real-world scenarios - such as e.g. performing a row-wise logging into a database or a CSV file, or performing calculations on multiple metrics based on one sampling from the device. Rapid Toolkit for Sparkplug is also perfectly suited for scenarios that do not require complete dataset - they can typically be handled by subscribing to individual (or all) metrics (as described earlier in this article), instead of by subscribing to a payload.

If you want to subscribe to payloads as they are received, and want to deal with incomplete datasets, you can set the DeliverCompleteDataSet Property in the subscription arguments to false. This will cause the subscription to deliver payload object with only the metrics that were actually contained in the received message.

The following example illustrates the code that can be used to subscribe to incomplete datasets.

Example

.NET

// This example shows how to receive payload with only the metrics that actually contained in the message.
//
// In order to publish or observe messages for this example, start the SparkplugEdgeNodeConsoleDemo program first.
//
// Find all latest examples here: https://opclabs.doc-that.com/files/onlinedocs/OPCLabs-ConnectivityStudio/Latest/examples.html .
// Sparkplug examples in C# on GitHub: https://github.com/OPCLabs/Examples-ConnectivityStudio-CSharp .
// Missing some example? Ask us for it on our Online Forums, https://www.opclabs.com/forum/index ! You do not have to own
// a commercial license in order to use Online Forums, and we reply to every post.

using System;
using System.Collections.Generic;
using OpcLabs.EasySparkplug;
using OpcLabs.EasySparkplug.OperationModel;

namespace SparkplugDocExamples.Consumer._EasySparkplugConsumer
{
    class DeliverCompleteDataSet
    {
        public static void Main1()
        {
            // Note that the default port for the "mqtt" scheme is 1883.
            var hostDescriptor = new SparkplugHostDescriptor("mqtt://localhost");

            // Instantiate the consumer object and hook events.
            var consumer = new EasySparkplugConsumer();
            consumer.PayloadNotification += consumer_Main1_PayloadNotification;

            Console.WriteLine("Subscribing...");
            consumer.SubscribePayload(
                new EasySparkplugPayloadSubscriptionArguments(
                    hostDescriptor, 
                    new SparkplugEdgeNodeDescriptor("easyGroup", "easySparkplugDemo"))
                {
                    // Specify that we do not want the component to fill in metrics that have not been sent in the message.
                    DeliverCompleteDataSet = false
                });

            Console.WriteLine("Processing notifications for 20 seconds...");
            System.Threading.Thread.Sleep(20 * 1000);

            Console.WriteLine("Unsubscribing...");
            consumer.UnsubscribeAllPayloads();

            Console.WriteLine("Waiting for 5 seconds...");
            System.Threading.Thread.Sleep(5 * 1000);

            Console.WriteLine("Finished.");
        }


        static void consumer_Main1_PayloadNotification(object sender, EasySparkplugPayloadNotificationEventArgs eventArgs)
        {
            // Handle different types of notifications.
            Console.WriteLine();
            switch (eventArgs.NotificationType)
            {
                case SparkplugNotificationType.Connect:
                    Console.WriteLine($"Connected to Sparkplug host, client ID: {eventArgs.ClientId}.");
                    break;
                case SparkplugNotificationType.Disconnect:
                    Console.WriteLine("Disconnected from Sparkplug host.");
                    break;
                case SparkplugNotificationType.Data:
                case SparkplugNotificationType.Birth:
                    Console.WriteLine("Received birth or data message from Sparkplug host.");
                    // Display the metrics name and data for each metric delivered in the payload.
                    foreach (KeyValuePair<string, SparkplugMetricElement> pair in eventArgs.Payload)
                        Console.WriteLine($"{pair.Key}: {pair.Value.MetricData}");
                    break;
                case SparkplugNotificationType.Death:
                    Console.WriteLine("Received death message from Sparkplug host.");
                    break;
            }
            if (!eventArgs.Succeeded)
                Console.WriteLine($"*** Failure: {eventArgs.ErrorMessageBrief}");
        }
    }
}
' This example shows how to receive payload with only the metrics that actually contained in the message.
'
' In order to publish or observe messages for this example, start the SparkplugEdgeNodeConsoleDemo program first.
'
' Find all latest examples here: https://opclabs.doc-that.com/files/onlinedocs/OPCLabs-ConnectivityStudio/Latest/examples.html .
' Sparkplug examples in C# on GitHub: https://github.com/OPCLabs/Examples-ConnectivityStudio-CSharp .
' Missing some example? Ask us for it on our Online Forums, https://www.opclabs.com/forum/index ! You do not have to own
' a commercial license in order to use Online Forums, and we reply to every post.

Imports OpcLabs.EasySparkplug
Imports OpcLabs.EasySparkplug.OperationModel

Namespace Global.SparkplugDocExamples.Consumer._EasySparkplugConsumer
    Class DeliverCompleteDataSet
        Public Shared Sub Main1()
            ' Note that the default port for the "mqtt" scheme is 1883.
            Dim hostDescriptor = New SparkplugHostDescriptor("mqtt://localhost")

            ' Instantiate the consumer object and hook events.
            Dim consumer = New EasySparkplugConsumer()
            AddHandler consumer.PayloadNotification, AddressOf consumer_Main1_PayloadNotification

            Console.WriteLine("Subscribing...")
            consumer.SubscribePayload(
                New EasySparkplugPayloadSubscriptionArguments(
                    hostDescriptor,
                    New SparkplugEdgeNodeDescriptor("easyGroup", "easySparkplugDemo")) _
                        With { _
                               _ ' Specify that we do not want the component to fill in metrics that have not been sent in the message.
                            .DeliverCompleteDataSet = False}
                )
            Console.WriteLine("Processing notifications for 20 seconds...")
            Threading.Thread.Sleep(20 * 1000)

            Console.WriteLine("Unsubscribing...")
            consumer.UnsubscribeAllPayloads()

            Console.WriteLine("Waiting for 5 seconds...")
            Threading.Thread.Sleep(5 * 1000)

            Console.WriteLine("Finished.")
        End Sub

        Private Shared Sub consumer_Main1_PayloadNotification(ByVal sender As Object, ByVal eventArgs As EasySparkplugPayloadNotificationEventArgs)
            ' Handle different types of notifications.
            Console.WriteLine()
            Select Case eventArgs.NotificationType
                Case SparkplugNotificationType.Connect
                    Console.WriteLine($"Connected to Sparkplug host, client ID: {eventArgs.ClientId}.")
                Case SparkplugNotificationType.Disconnect
                    Console.WriteLine("Disconnected from Sparkplug host.")
                Case SparkplugNotificationType.Data,
                    SparkplugNotificationType.Birth
                    Console.WriteLine("Received birth or data message from Sparkplug host.")
                    ' Display the metrics name and data for each metric delivered in the payload.
                    For Each pair As KeyValuePair(Of String, SparkplugMetricElement) In eventArgs.Payload
                        Console.WriteLine($"{pair.Key}: {pair.Value.MetricData}")
                    Next
                Case SparkplugNotificationType.Death
                    Console.WriteLine("Received death message from Sparkplug host.")
            End Select
            If Not eventArgs.Succeeded Then
                Console.WriteLine($"*** Failure: {eventArgs.ErrorMessageBrief}")
            End If
        End Sub
    End Class
End Namespace

 

 

Sparkplug is a trademark of Eclipse Foundation, Inc. "MQTT" is a trademark of the OASIS Open standards consortium. Other related terms are trademarks of their respective owners. Any use of these terms on this site is for descriptive purposes only and does not imply any sponsorship, endorsement or affiliation.

See Also